Hudi Docker Demo

适用于版本0.10.1。

以下步骤在经过MacBook测试。

1 准备

  • 硬件

    执行Spark SQL查询需要至少6GB内存和4个CPU核心

  • Docker环境

  • kcat

    从命令行生产消费kafka主题的工具

  • /etc/hosts

    映射容器服务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    127.0.0.1 adhoc-1
    127.0.0.1 adhoc-2
    127.0.0.1 namenode
    127.0.0.1 datanode1
    127.0.0.1 hiveserver
    127.0.0.1 hivemetastore
    127.0.0.1 kafkabroker
    127.0.0.1 sparkmaster
    127.0.0.1 zookeeper
  • JDK 8

  • Maven

  • jq

    用于命令行处理json

2 部署Docker集群

(1) 编译Hudi

当前默认使用Scala 2.11

1
2
cd <HUDI_WORKSPACE>
mvn package -DskipTests

(2) 启动集群

执行docker compose脚本并配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
$ cd docker
$ ./setup_demo.sh
....
....
....
[+] Running 10/13
⠿ Container zookeeper Removed 8.6s
⠿ Container datanode1 Removed 18.3s
⠿ Container trino-worker-1 Removed 50.7s
⠿ Container spark-worker-1 Removed 16.7s
⠿ Container adhoc-2 Removed 16.9s
⠿ Container graphite Removed 16.9s
⠿ Container kafkabroker Removed 14.1s
⠿ Container adhoc-1 Removed 14.1s
⠿ Container presto-worker-1 Removed 11.9s
⠿ Container presto-coordinator-1 Removed 34.6s
.......
......
[+] Running 17/17
⠿ adhoc-1 Pulled 2.9s
⠿ graphite Pulled 2.8s
⠿ spark-worker-1 Pulled 3.0s
⠿ kafka Pulled 2.9s
⠿ datanode1 Pulled 2.9s
⠿ hivemetastore Pulled 2.9s
⠿ hiveserver Pulled 3.0s
⠿ hive-metastore-postgresql Pulled 2.8s
⠿ presto-coordinator-1 Pulled 2.9s
⠿ namenode Pulled 2.9s
⠿ trino-worker-1 Pulled 2.9s
⠿ sparkmaster Pulled 2.9s
⠿ presto-worker-1 Pulled 2.9s
⠿ zookeeper Pulled 2.8s
⠿ adhoc-2 Pulled 2.9s
⠿ historyserver Pulled 2.9s
⠿ trino-coordinator-1 Pulled 2.9s
[+] Running 17/17
⠿ Container zookeeper Started 41.0s
⠿ Container kafkabroker Started 41.7s
⠿ Container graphite Started 41.5s
⠿ Container hive-metastore-postgresql Running 0.0s
⠿ Container namenode Running 0.0s
⠿ Container hivemetastore Running 0.0s
⠿ Container trino-coordinator-1 Runni... 0.0s
⠿ Container presto-coordinator-1 Star... 42.1s
⠿ Container historyserver Started 41.0s
⠿ Container datanode1 Started 49.9s
⠿ Container hiveserver Running 0.0s
⠿ Container trino-worker-1 Started 42.1s
⠿ Container sparkmaster Started 41.9s
⠿ Container spark-worker-1 Started 50.2s
⠿ Container adhoc-2 Started 38.5s
⠿ Container adhoc-1 Started 38.5s
⠿ Container presto-worker-1 Started 38.4s
Copying spark default config and setting up configs
Copying spark default config and setting up configs
$ docker ps

至此,以下服务准备就绪:

  • HDFS(包含命名节点和数据节点)
  • Spark主从节点
  • Hive服务(包含Metastore、PostgresDB支持的HiveServer2)
  • Kafka Broker(作为输入源)和一个ZooKeeper节点
  • Presto主从节点
  • Trino主从节点
  • Hive with Hudi命令行即时查询容器

3 示例

示例数据为股票追踪数据,按照分钟粒度呈现,存放在docer/demo/data下。其中包含两批数据,第一批早9点半到10点半,第二批早10点半到11点,两批之间存在数据交叉。

(1) 发布首批数据到kafka

上传第一批数据到stock ticks主题中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 上传数据
$ cat docker/demo/data/batch_1.json | kcat -b kafkabroker -t stock_ticks -P

# 检查数据
$ kcat -b kafkabroker -L -J | jq .

{
"originating_broker": {
"id": 1001,
"name": "kafkabroker:9092/1001"
},
"query": {
"topic": "*"
},
"brokers": [
{
"id": 1001,
"name": "kafkabroker:9092"
}
],
"topics": [
{
"topic": "stock_ticks",
"partitions": [
{
"partition": 0,
"leader": 1001,
"replicas": [
{
"id": 1001
}
],
"isrs": [
{
"id": 1001
}
]
}
]
}
]
}

(2) 从kafka主题中增量消费数据

DeltaStreamer可以连接多种数据源(包含kafka),并且应用变化到Hudi表中。

此处使用DeltaStreamer从kafka主题中下载json数据并初始化Hudi COW/MOR表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
docker exec -it adhoc-2 /bin/bash

# Run the following spark-submit command to execute the delta-streamer and ingest to stock_ticks_cow table in HDFS
spark-submit \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \
--table-type COPY_ON_WRITE \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--source-ordering-field ts \
--target-base-path /user/hive/warehouse/stock_ticks_cow \
--target-table stock_ticks_cow --props /var/demo/config/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider

# Run the following spark-submit command to execute the delta-streamer and ingest to stock_ticks_mor table in HDFS
spark-submit \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \
--table-type MERGE_ON_READ \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--source-ordering-field ts \
--target-base-path /user/hive/warehouse/stock_ticks_mor \
--target-table stock_ticks_mor \
--props /var/demo/config/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--disable-compaction

# As part of the setup (Look at setup_demo.sh), the configs needed for DeltaStreamer is uploaded to HDFS. The configs
# contain mostly Kafa connectivity settings, the avro-schema to be used for ingesting along with key and partitioning fields.

exit

可以通过HDFS浏览器查看创建的表,如http://namenode:50070/explorer.html#/user/hive/warehouse/stock_ticks_cow

分区文件夹中,.hoodie下的commit或deltacommit文件标志提交成功

(3) 同步Hive

至此HDFS上的表文件已经准备就绪,需要创建Hive表以执行Hive查询。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
docker exec -it adhoc-2 /bin/bash

# This command takes in HiveServer URL and COW Hudi table location in HDFS and sync the HDFS state to Hive
/var/hoodie/ws/hudi-sync/hudi-hive-sync/run_sync_tool.sh \
--jdbc-url jdbc:hive2://hiveserver:10000 \
--user hive \
--pass hive \
--partitioned-by dt \
--base-path /user/hive/warehouse/stock_ticks_cow \
--database default \
--table stock_ticks_cow
.....
2020-01-25 19:51:28,953 INFO [main] hive.HiveSyncTool (HiveSyncTool.java:syncHoodieTable(129)) - Sync complete for stock_ticks_cow
.....

# Now run hive-sync for the second data-set in HDFS using Merge-On-Read (MOR table type)
/var/hoodie/ws/hudi-sync/hudi-hive-sync/run_sync_tool.sh \
--jdbc-url jdbc:hive2://hiveserver:10000 \
--user hive \
--pass hive \
--partitioned-by dt \
--base-path /user/hive/warehouse/stock_ticks_mor \
--database default \
--table stock_ticks_mor
...
2020-01-25 19:51:51,066 INFO [main] hive.HiveSyncTool (HiveSyncTool.java:syncHoodieTable(129)) - Sync complete for stock_ticks_mor_ro
...
2020-01-25 19:51:51,569 INFO [main] hive.HiveSyncTool (HiveSyncTool.java:syncHoodieTable(129)) - Sync complete for stock_ticks_mor_rt
....

exit

(4) 查询

1) Hive查询

查询股票代码GOOG最近的数据时间戳。由于Hudi为首批数据创建的是parquet文件,因此RO表和RT表查询结果一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
docker exec -it adhoc-2 /bin/bash
beeline -u jdbc:hive2://hiveserver:10000 \
--hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat \
--hiveconf hive.stats.autogather=false

# List Tables
0: jdbc:hive2://hiveserver:10000> show tables;
+---------------------+--+
| tab_name |
+---------------------+--+
| stock_ticks_cow |
| stock_ticks_mor_ro |
| stock_ticks_mor_rt |
+---------------------+--+
3 rows selected (1.199 seconds)
0: jdbc:hive2://hiveserver:10000>


# Look at partitions that were added
0: jdbc:hive2://hiveserver:10000> show partitions stock_ticks_mor_rt;
+----------------+--+
| partition |
+----------------+--+
| dt=2018-08-31 |
+----------------+--+
1 row selected (0.24 seconds)


# COPY-ON-WRITE Queries:
=========================


0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG';
+---------+----------------------+--+
| symbol | _c1 |
+---------+----------------------+--+
| GOOG | 2018-08-31 10:29:00 |
+---------+----------------------+--+

Now, run a projection query:

0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';
+----------------------+---------+----------------------+---------+------------+-----------+--+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+--+
| 20180924221953 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
| 20180924221953 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 |
+----------------------+---------+----------------------+---------+------------+-----------+--+


# Merge-On-Read Queries:
==========================

Lets run similar queries against M-O-R table. Lets look at both
ReadOptimized and Snapshot(realtime data) queries supported by M-O-R table

# Run ReadOptimized Query. Notice that the latest timestamp is 10:29
0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG';
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
+---------+----------------------+--+
| symbol | _c1 |
+---------+----------------------+--+
| GOOG | 2018-08-31 10:29:00 |
+---------+----------------------+--+
1 row selected (6.326 seconds)


# Run Snapshot Query. Notice that the latest timestamp is again 10:29

0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG';
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
+---------+----------------------+--+
| symbol | _c1 |
+---------+----------------------+--+
| GOOG | 2018-08-31 10:29:00 |
+---------+----------------------+--+
1 row selected (1.606 seconds)


# Run Read Optimized and Snapshot project queries

0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';
+----------------------+---------+----------------------+---------+------------+-----------+--+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+--+
| 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
| 20180924222155 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 |
+----------------------+---------+----------------------+---------+------------+-----------+--+

0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';
+----------------------+---------+----------------------+---------+------------+-----------+--+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+--+
| 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
| 20180924222155 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 |
+----------------------+---------+----------------------+---------+------------+-----------+--+

exit

2) Spark SQL查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
docker exec -it adhoc-1 /bin/bash
$SPARK_INSTALL/bin/spark-shell \
--jars $HUDI_SPARK_BUNDLE \
--master local[2] \
--driver-class-path $HADOOP_CONF_DIR \
--conf spark.sql.hive.convertMetastoreParquet=false \
--deploy-mode client \
--driver-memory 1G \
--executor-memory 3G \
--num-executors 1 \
--packages org.apache.spark:spark-avro_2.11:2.4.4
...

Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.4
/_/

Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_212)
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark.sql("show tables").show(100, false)
+--------+------------------+-----------+
|database|tableName |isTemporary|
+--------+------------------+-----------+
|default |stock_ticks_cow |false |
|default |stock_ticks_mor_ro|false |
|default |stock_ticks_mor_rt|false |
+--------+------------------+-----------+

# Copy-On-Write Table

## Run max timestamp query against COW table

scala> spark.sql("select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG'").show(100, false)
[Stage 0:> (0 + 1) / 1]SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes#StaticLoggerBinder for further details.
+------+-------------------+
|symbol|max(ts) |
+------+-------------------+
|GOOG |2018-08-31 10:29:00|
+------+-------------------+

## Projection Query

scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG'").show(100, false)
+-------------------+------+-------------------+------+---------+--------+
|_hoodie_commit_time|symbol|ts |volume|open |close |
+-------------------+------+-------------------+------+---------+--------+
|20180924221953 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 |
|20180924221953 |GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085|
+-------------------+------+-------------------+------+---------+--------+

# Merge-On-Read Queries:
==========================

Lets run similar queries against M-O-R table. Lets look at both
ReadOptimized and Snapshot queries supported by M-O-R table

# Run ReadOptimized Query. Notice that the latest timestamp is 10:29
scala> spark.sql("select symbol, max(ts) from stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG'").show(100, false)
+------+-------------------+
|symbol|max(ts) |
+------+-------------------+
|GOOG |2018-08-31 10:29:00|
+------+-------------------+


# Run Snapshot Query. Notice that the latest timestamp is again 10:29

scala> spark.sql("select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false)
+------+-------------------+
|symbol|max(ts) |
+------+-------------------+
|GOOG |2018-08-31 10:29:00|
+------+-------------------+

# Run Read Optimized and Snapshot project queries

scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG'").show(100, false)
+-------------------+------+-------------------+------+---------+--------+
|_hoodie_commit_time|symbol|ts |volume|open |close |
+-------------------+------+-------------------+------+---------+--------+
|20180924222155 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 |
|20180924222155 |GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085|
+-------------------+------+-------------------+------+---------+--------+

scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false)
+-------------------+------+-------------------+------+---------+--------+
|_hoodie_commit_time|symbol|ts |volume|open |close |
+-------------------+------+-------------------+------+---------+--------+
|20180924222155 |GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 |
|20180924222155 |GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085|
+-------------------+------+-------------------+------+---------+--------+

3) Presto查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
docker exec -it presto-worker-1 presto --server presto-coordinator-1:8090
presto> show catalogs;
Catalog
-----------
hive
jmx
localfile
system
(4 rows)

Query 20190817_134851_00000_j8rcz, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:04 [0 rows, 0B] [0 rows/s, 0B/s]

presto> use hive.default;
USE
presto:default> show tables;
Table
--------------------
stock_ticks_cow
stock_ticks_mor_ro
stock_ticks_mor_rt
(3 rows)

Query 20190822_181000_00001_segyw, FINISHED, 2 nodes
Splits: 19 total, 19 done (100.00%)
0:05 [3 rows, 99B] [0 rows/s, 18B/s]


# COPY-ON-WRITE Queries:
=========================


presto:default> select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG';
symbol | _col1
--------+---------------------
GOOG | 2018-08-31 10:29:00
(1 row)

Query 20190822_181011_00002_segyw, FINISHED, 1 node
Splits: 49 total, 49 done (100.00%)
0:12 [197 rows, 613B] [16 rows/s, 50B/s]

presto:default> select "_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';
_hoodie_commit_time | symbol | ts | volume | open | close
---------------------+--------+---------------------+--------+-----------+----------
20190822180221 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02
20190822180221 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085
(2 rows)

Query 20190822_181141_00003_segyw, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:02 [197 rows, 613B] [109 rows/s, 341B/s]


# Merge-On-Read Queries:
==========================

Lets run similar queries against M-O-R table.

# Run ReadOptimized Query. Notice that the latest timestamp is 10:29
presto:default> select symbol, max(ts) from stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG';
symbol | _col1
--------+---------------------
GOOG | 2018-08-31 10:29:00
(1 row)

Query 20190822_181158_00004_segyw, FINISHED, 1 node
Splits: 49 total, 49 done (100.00%)
0:02 [197 rows, 613B] [110 rows/s, 343B/s]


presto:default> select "_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';
_hoodie_commit_time | symbol | ts | volume | open | close
---------------------+--------+---------------------+--------+-----------+----------
20190822180250 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02
20190822180250 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085
(2 rows)

Query 20190822_181256_00006_segyw, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:02 [197 rows, 613B] [92 rows/s, 286B/s]

presto:default> exit

4) Trino查询

(5) 上传第二批数据并使用Delta Streamer消费

上传第二批数据。由于没有创建新的分区,因此无需同步Hive。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
cat docker/demo/data/batch_2.json | kcat -b kafkabroker -t stock_ticks -P

# Within Docker container, run the ingestion command
docker exec -it adhoc-2 /bin/bash

# Run the following spark-submit command to execute the delta-streamer and ingest to stock_ticks_cow table in HDFS
spark-submit \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \
--table-type COPY_ON_WRITE \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--source-ordering-field ts \
--target-base-path /user/hive/warehouse/stock_ticks_cow \
--target-table stock_ticks_cow \
--props /var/demo/config/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider

# Run the following spark-submit command to execute the delta-streamer and ingest to stock_ticks_mor table in HDFS
spark-submit \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \
--table-type MERGE_ON_READ \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--source-ordering-field ts \
--target-base-path /user/hive/warehouse/stock_ticks_mor \
--target-table stock_ticks_mor \
--props /var/demo/config/kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--disable-compaction

exit

(6) 查询

第二次提交,对于COW表将产生新的parquet文件,而对于MOR表只是追加到log变化文件中。

因此MOR表读优化由于只读区parquet文件,将不会反应最新变化,而快照读可以。

1) Hive查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
docker exec -it adhoc-2 /bin/bash
beeline -u jdbc:hive2://hiveserver:10000 \
--hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat \
--hiveconf hive.stats.autogather=false

# Copy On Write Table:

0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG';
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
+---------+----------------------+--+
| symbol | _c1 |
+---------+----------------------+--+
| GOOG | 2018-08-31 10:59:00 |
+---------+----------------------+--+
1 row selected (1.932 seconds)

0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';
+----------------------+---------+----------------------+---------+------------+-----------+--+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+--+
| 20180924221953 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
| 20180924224524 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |
+----------------------+---------+----------------------+---------+------------+-----------+--+

As you can notice, the above queries now reflect the changes that came as part of ingesting second batch.


# Merge On Read Table:

# Read Optimized Query
0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG';
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
+---------+----------------------+--+
| symbol | _c1 |
+---------+----------------------+--+
| GOOG | 2018-08-31 10:29:00 |
+---------+----------------------+--+
1 row selected (1.6 seconds)

0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';
+----------------------+---------+----------------------+---------+------------+-----------+--+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+--+
| 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
| 20180924222155 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 |
+----------------------+---------+----------------------+---------+------------+-----------+--+

# Snapshot Query
0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG';
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
+---------+----------------------+--+
| symbol | _c1 |
+---------+----------------------+--+
| GOOG | 2018-08-31 10:59:00 |
+---------+----------------------+--+

0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';
+----------------------+---------+----------------------+---------+------------+-----------+--+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+--+
| 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
| 20180924224537 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |
+----------------------+---------+----------------------+---------+------------+-----------+--+

exit

2) Spark SQL查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
docker exec -it adhoc-1 /bin/bash
$SPARK_INSTALL/bin/spark-shell \
--jars $HUDI_SPARK_BUNDLE \
--driver-class-path $HADOOP_CONF_DIR \
--conf spark.sql.hive.convertMetastoreParquet=false \
--deploy-mode client \
--driver-memory 1G \
--master local[2] \
--executor-memory 3G \
--num-executors 1 \
--packages org.apache.spark:spark-avro_2.11:2.4.4

# Copy On Write Table:

scala> spark.sql("select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG'").show(100, false)
+------+-------------------+
|symbol|max(ts) |
+------+-------------------+
|GOOG |2018-08-31 10:59:00|
+------+-------------------+

scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG'").show(100, false)

+----------------------+---------+----------------------+---------+------------+-----------+--+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+--+
| 20180924221953 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
| 20180924224524 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |
+----------------------+---------+----------------------+---------+------------+-----------+--+

As you can notice, the above queries now reflect the changes that came as part of ingesting second batch.


# Merge On Read Table:

# Read Optimized Query
scala> spark.sql("select symbol, max(ts) from stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG'").show(100, false)
+---------+----------------------+
| symbol | _c1 |
+---------+----------------------+
| GOOG | 2018-08-31 10:29:00 |
+---------+----------------------+
1 row selected (1.6 seconds)

scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG'").show(100, false)
+----------------------+---------+----------------------+---------+------------+-----------+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+
| 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
| 20180924222155 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 |
+----------------------+---------+----------------------+---------+------------+-----------+

# Snapshot Query
scala> spark.sql("select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false)
+---------+----------------------+
| symbol | _c1 |
+---------+----------------------+
| GOOG | 2018-08-31 10:59:00 |
+---------+----------------------+

scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false)
+----------------------+---------+----------------------+---------+------------+-----------+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+
| 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
| 20180924224537 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |
+----------------------+---------+----------------------+---------+------------+-----------+

exit

3) Presto查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
docker exec -it presto-worker-1 presto --server presto-coordinator-1:8090
presto> use hive.default;
USE

# Copy On Write Table:

presto:default>select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG';
symbol | _col1
--------+---------------------
GOOG | 2018-08-31 10:59:00
(1 row)

Query 20190822_181530_00007_segyw, FINISHED, 1 node
Splits: 49 total, 49 done (100.00%)
0:02 [197 rows, 613B] [125 rows/s, 389B/s]

presto:default>select "_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';
_hoodie_commit_time | symbol | ts | volume | open | close
---------------------+--------+---------------------+--------+-----------+----------
20190822180221 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02
20190822181433 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215
(2 rows)

Query 20190822_181545_00008_segyw, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:02 [197 rows, 613B] [106 rows/s, 332B/s]

As you can notice, the above queries now reflect the changes that came as part of ingesting second batch.


# Merge On Read Table:

# Read Optimized Query
presto:default> select symbol, max(ts) from stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG';
symbol | _col1
--------+---------------------
GOOG | 2018-08-31 10:29:00
(1 row)

Query 20190822_181602_00009_segyw, FINISHED, 1 node
Splits: 49 total, 49 done (100.00%)
0:01 [197 rows, 613B] [139 rows/s, 435B/s]

presto:default>select "_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';
_hoodie_commit_time | symbol | ts | volume | open | close
---------------------+--------+---------------------+--------+-----------+----------
20190822180250 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02
20190822180250 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085
(2 rows)

Query 20190822_181615_00010_segyw, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:01 [197 rows, 613B] [154 rows/s, 480B/s]

presto:default> exit

4) Trino查询

(7) 增量查询

1) Hive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
docker exec -it adhoc-2 /bin/bash
beeline -u jdbc:hive2://hiveserver:10000 \
--hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat \
--hiveconf hive.stats.autogather=false

0: jdbc:hive2://hiveserver:10000> set hoodie.stock_ticks_cow.consume.mode=INCREMENTAL;
No rows affected (0.009 seconds)
0: jdbc:hive2://hiveserver:10000> set hoodie.stock_ticks_cow.consume.max.commits=3;
No rows affected (0.009 seconds)
0: jdbc:hive2://hiveserver:10000> set hoodie.stock_ticks_cow.consume.start.timestamp=20180924064621;

0: jdbc:hive2://hiveserver:10000>
0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG' and `_hoodie_commit_time` > '20180924064621';
+----------------------+---------+----------------------+---------+------------+-----------+--+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+--+
| 20180924065039 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |
+----------------------+---------+----------------------+---------+------------+-----------+--+
1 row selected (0.83 seconds)
0: jdbc:hive2://hiveserver:10000>

2) Spark Shell

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
docker exec -it adhoc-1 /bin/bash
$SPARK_INSTALL/bin/spark-shell \
--jars $HUDI_SPARK_BUNDLE \
--driver-class-path $HADOOP_CONF_DIR \
--conf spark.sql.hive.convertMetastoreParquet=false \
--deploy-mode client \
--driver-memory 1G \
--master local[2] \
--executor-memory 3G \
--num-executors 1 \
--packages org.apache.spark:spark-avro_2.11:2.4.4

Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.4
/_/

Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_212)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.DataSourceReadOptions

# In the below query, 20180925045257 is the first commit's timestamp
scala> val hoodieIncViewDF = spark.read.format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20180924064621").load("/user/hive/warehouse/stock_ticks_cow")
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes#StaticLoggerBinder for further details.
hoodieIncViewDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 15 more fields]

scala> hoodieIncViewDF.registerTempTable("stock_ticks_cow_incr_tmp1")
warning: there was one deprecation warning; re-run with -deprecation for details

scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow_incr_tmp1 where symbol = 'GOOG'").show(100, false);
+----------------------+---------+----------------------+---------+------------+-----------+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+
| 20180924065039 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |
+----------------------+---------+----------------------+---------+------------+-----------+

(8) 调度并对MOR表压缩

使用Hudi CLI调度并压缩

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
docker exec -it adhoc-1 /bin/bash
root@adhoc-1:/opt# /var/hoodie/ws/hudi-cli/hudi-cli.sh
...
Table command getting loaded
HoodieSplashScreen loaded
===================================================================
* ___ ___ *
* /\__\ ___ /\ \ ___ *
* / / / /\__\ / \ \ /\ \ *
* / /__/ / / / / /\ \ \ \ \ \ *
* / \ \ ___ / / / / / \ \__\ / \__\ *
* / /\ \ /\__\ / /__/ ___ / /__/ \ |__| / /\/__/ *
* \/ \ \/ / / \ \ \ /\__\ \ \ \ / / / /\/ / / *
* \ / / \ \ / / / \ \ / / / \ /__/ *
* / / / \ \/ / / \ \/ / / \ \__\ *
* / / / \ / / \ / / \/__/ *
* \/__/ \/__/ \/__/ Apache Hudi CLI *
* *
===================================================================

Welcome to Apache Hudi CLI. Please type help if you are looking for help.
hudi->connect --path /user/hive/warehouse/stock_ticks_mor
18/09/24 06:59:34 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/09/24 06:59:35 INFO table.HoodieTableMetaClient: Loading HoodieTableMetaClient from /user/hive/warehouse/stock_ticks_mor
18/09/24 06:59:35 INFO util.FSUtils: Hadoop Configuration: fs.defaultFS: [hdfs://namenode:8020], Config:[Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml], FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1261652683_11, ugi=root (auth:SIMPLE)]]]
18/09/24 06:59:35 INFO table.HoodieTableConfig: Loading table properties from /user/hive/warehouse/stock_ticks_mor/.hoodie/hoodie.properties
18/09/24 06:59:36 INFO table.HoodieTableMetaClient: Finished Loading Table of type MERGE_ON_READ(version=1) from /user/hive/warehouse/stock_ticks_mor
Metadata for table stock_ticks_mor loaded
hoodie:stock_ticks_mor->compactions show all
20/02/10 03:41:32 INFO timeline.HoodieActiveTimeline: Loaded instants [[20200210015059__clean__COMPLETED], [20200210015059__deltacommit__COMPLETED], [20200210022758__clean__COMPLETED], [20200210022758__deltacommit__COMPLETED], [==>20200210023843__compaction__REQUESTED]]
___________________________________________________________________
| Compaction Instant Time| State | Total FileIds to be Compacted|
|==================================================================|

# Schedule a compaction. This will use Spark Launcher to schedule compaction
hoodie:stock_ticks_mor->compaction schedule
....
Compaction successfully completed for 20180924070031

# Now refresh and check again. You will see that there is a new compaction requested

hoodie:stock_ticks->connect --path /user/hive/warehouse/stock_ticks_mor
18/09/24 07:01:16 INFO table.HoodieTableMetaClient: Loading HoodieTableMetaClient from /user/hive/warehouse/stock_ticks_mor
18/09/24 07:01:16 INFO util.FSUtils: Hadoop Configuration: fs.defaultFS: [hdfs://namenode:8020], Config:[Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml], FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1261652683_11, ugi=root (auth:SIMPLE)]]]
18/09/24 07:01:16 INFO table.HoodieTableConfig: Loading table properties from /user/hive/warehouse/stock_ticks_mor/.hoodie/hoodie.properties
18/09/24 07:01:16 INFO table.HoodieTableMetaClient: Finished Loading Table of type MERGE_ON_READ(version=1) from /user/hive/warehouse/stock_ticks_mor
Metadata for table stock_ticks_mor loaded

hoodie:stock_ticks_mor->compactions show all
18/09/24 06:34:12 INFO timeline.HoodieActiveTimeline: Loaded instants [[20180924041125__clean__COMPLETED], [20180924041125__deltacommit__COMPLETED], [20180924042735__clean__COMPLETED], [20180924042735__deltacommit__COMPLETED], [==>20180924063245__compaction__REQUESTED]]
___________________________________________________________________
| Compaction Instant Time| State | Total FileIds to be Compacted|
|==================================================================|
| 20180924070031 | REQUESTED| 1 |

# Execute the compaction. The compaction instant value passed below must be the one displayed in the above "compactions show all" query
hoodie:stock_ticks_mor->compaction run --compactionInstant 20180924070031 --parallelism 2 --sparkMemory 1G --schemaFilePath /var/demo/config/schema.avsc --retry 1
....
Compaction successfully completed for 20180924070031

## Now check if compaction is completed

hoodie:stock_ticks_mor->connect --path /user/hive/warehouse/stock_ticks_mor
18/09/24 07:03:00 INFO table.HoodieTableMetaClient: Loading HoodieTableMetaClient from /user/hive/warehouse/stock_ticks_mor
18/09/24 07:03:00 INFO util.FSUtils: Hadoop Configuration: fs.defaultFS: [hdfs://namenode:8020], Config:[Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml], FileSystem: [DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1261652683_11, ugi=root (auth:SIMPLE)]]]
18/09/24 07:03:00 INFO table.HoodieTableConfig: Loading table properties from /user/hive/warehouse/stock_ticks_mor/.hoodie/hoodie.properties
18/09/24 07:03:00 INFO table.HoodieTableMetaClient: Finished Loading Table of type MERGE_ON_READ(version=1) from /user/hive/warehouse/stock_ticks_mor
Metadata for table stock_ticks_mor loaded

hoodie:stock_ticks->compactions show all
18/09/24 07:03:15 INFO timeline.HoodieActiveTimeline: Loaded instants [[20180924064636__clean__COMPLETED], [20180924064636__deltacommit__COMPLETED], [20180924065057__clean__COMPLETED], [20180924065057__deltacommit__COMPLETED], [20180924070031__commit__COMPLETED]]
___________________________________________________________________
| Compaction Instant Time| State | Total FileIds to be Compacted|
|==================================================================|
| 20180924070031 | COMPLETED| 1 |

(9) Hive查询(包含增量数据)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
docker exec -it adhoc-2 /bin/bash
beeline -u jdbc:hive2://hiveserver:10000 \
--hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat \
--hiveconf hive.stats.autogather=false

# Read Optimized Query
0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG';
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
+---------+----------------------+--+
| symbol | _c1 |
+---------+----------------------+--+
| GOOG | 2018-08-31 10:59:00 |
+---------+----------------------+--+
1 row selected (1.6 seconds)

0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';
+----------------------+---------+----------------------+---------+------------+-----------+--+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+--+
| 20180924064636 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
| 20180924070031 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |
+----------------------+---------+----------------------+---------+------------+-----------+--+

# Snapshot Query
0: jdbc:hive2://hiveserver:10000> select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG';
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
+---------+----------------------+--+
| symbol | _c1 |
+---------+----------------------+--+
| GOOG | 2018-08-31 10:59:00 |
+---------+----------------------+--+

0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';
+----------------------+---------+----------------------+---------+------------+-----------+--+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+--+
| 20180924064636 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
| 20180924070031 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |
+----------------------+---------+----------------------+---------+------------+-----------+--+

# Incremental Query:

0: jdbc:hive2://hiveserver:10000> set hoodie.stock_ticks_mor.consume.mode=INCREMENTAL;
No rows affected (0.008 seconds)
# Max-Commits covers both second batch and compaction commit
0: jdbc:hive2://hiveserver:10000> set hoodie.stock_ticks_mor.consume.max.commits=3;
No rows affected (0.007 seconds)
0: jdbc:hive2://hiveserver:10000> set hoodie.stock_ticks_mor.consume.start.timestamp=20180924064636;
No rows affected (0.013 seconds)
# Query:
0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG' and `_hoodie_commit_time` > '20180924064636';
+----------------------+---------+----------------------+---------+------------+-----------+--+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+--+
| 20180924070031 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |
+----------------------+---------+----------------------+---------+------------+-----------+--+

exit

(10) 使用Spark SQL对压缩后MOR表读优化查询和快照查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
docker exec -it adhoc-1 /bin/bash
$SPARK_INSTALL/bin/spark-shell \
--jars $HUDI_SPARK_BUNDLE \
--driver-class-path $HADOOP_CONF_DIR \
--conf spark.sql.hive.convertMetastoreParquet=false \
--deploy-mode client \
--driver-memory 1G \
--master local[2] \
--executor-memory 3G \
--num-executors 1 \
--packages org.apache.spark:spark-avro_2.11:2.4.4

# Read Optimized Query
scala> spark.sql("select symbol, max(ts) from stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG'").show(100, false)
+---------+----------------------+
| symbol | max(ts) |
+---------+----------------------+
| GOOG | 2018-08-31 10:59:00 |
+---------+----------------------+
1 row selected (1.6 seconds)

scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG'").show(100, false)
+----------------------+---------+----------------------+---------+------------+-----------+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+
| 20180924064636 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
| 20180924070031 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |
+----------------------+---------+----------------------+---------+------------+-----------+

# Snapshot Query
scala> spark.sql("select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false)
+---------+----------------------+
| symbol | max(ts) |
+---------+----------------------+
| GOOG | 2018-08-31 10:59:00 |
+---------+----------------------+

scala> spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false)
+----------------------+---------+----------------------+---------+------------+-----------+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+
| 20180924064636 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
| 20180924070031 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |
+----------------------+---------+----------------------+---------+------------+-----------+

(11) 使用Presto对压缩后MOR表读优化查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
docker exec -it presto-worker-1 presto --server presto-coordinator-1:8090
presto> use hive.default;
USE

# Read Optimized Query
resto:default> select symbol, max(ts) from stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG';
symbol | _col1
--------+---------------------
GOOG | 2018-08-31 10:59:00
(1 row)

Query 20190822_182319_00011_segyw, FINISHED, 1 node
Splits: 49 total, 49 done (100.00%)
0:01 [197 rows, 613B] [133 rows/s, 414B/s]

presto:default> select "_hoodie_commit_time", symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';
_hoodie_commit_time | symbol | ts | volume | open | close
---------------------+--------+---------------------+--------+-----------+----------
20190822180250 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02
20190822181944 | GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215
(2 rows)

Query 20190822_182333_00012_segyw, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:02 [197 rows, 613B] [98 rows/s, 307B/s]

presto:default>

4 测试Hudi

当前组件版本Hadoop2.8.4、Hive2.3.3和Spark2.4.4.

1
2
3
4
5
6
7
8
9
10
-- 启动包含Hadoop、Hive和Spark的Docker环境
$ mvn pre-integration-test -DskipTests

-- 关闭容器
$ cd hudi-integ-test
$ mvn docker-compose:down

-- 启动容器
$ cd hudi-integ-test
$ mvn docker-compose:up -DdetachedMode=true

(1) 构建本地Docker容器

docker/compose/docker-compose_hadoop284_hive233_spark244.yml保证了本地jar覆盖内建jar。

ocker/build_local_docker_images.sh构建本地docker镜像。

参考资料